[アップデート] データストア間の連携が容易に!Amazon QLDB ストリームが利用可能になりました
先日のアップデートで Amazon Quantum Ledger Database (以下、QLDB)でリアルタイムストリーミングがサポートされました。
何がうれしいのか
QLDB とは
QLDB は「ジャーナル」と呼ばれる、トランザクションデータと関連するメタデータを記録するデータベースです。このメタデータにはクエリなどの内容も含まれています。そのため、一般的な RDB や KVS のデータストアでは難しいとされる過去の特定時点におけるデータの参照であったり、いつどのようにデータが変更されたかを参照することに長けています。
(RDB でもトランザクションログから追いかけることは可能ですが、あくまで管理者が参照するものであって一般的にはユーザーがログを参照することは出来ません。)
また、これらのデータは順序付けとハッシュチェーンによって、そのデータが改ざんされていないことを容易に検証できる仕組みになっています。
QLDB のパフォーマンス
QLDB は中央集権型のハッシューチェーンであり、分散型の Amazon Managed Blockchain(以下、AMB) とよく比較されます。対 AMB においては 2〜3 倍のトランザクション処理が可能であることは公式ページにも記載があります。
一方でその他のデータベースサービスと比較してどうか?という情報はありません。そもそも QLDB は先述のとおりジャーナルのデータベースあり、RDB や KVS と用途が違うため比較するものではありません。最新のデータ参照のパフォーマンスを期待するのであれば、Amazon RDS や Amazon DynamoDB を、複雑なクエリでの参照を期待するのであれば Amazon Elasticsearch Service というように役割に応じて使い分けることが望ましいでしょう。
そこで今回の QLDB ストリーム が活きてきます。
QLDB ストリームとは
QLDB ストリームは re:Invent 2019 でひっそりとパブリックプレビューとして発表された機能です。
既にある DynamoDB ストリームをご存知であればイメージは同じです。QLDB がジャーナルをコミットしたタイミングでダウンストリームにイベントを流すことが可能となります。
例えば先述のようにサプライチェーンのようなトラッキングや、監査のような不改ざんの検証には QLDB を利用しつつ、最新データの参照は DynamoDB に任せることでパフォーマンスが期待できます。Firehose を挟むことで S3 や Elasticsearch Service にデータを放り込むことも可能です。
そのほかにも QLDB へのコミットをトリガーに Lambda を実行し通知することで、受発注システムに組み込むことも期待できます。
このように QLDB ストリーム対応によって、アーキテクチャの幅がかなり広がりますね!
注意点
QLDBストリームは、少なくとも1回の配信を保証します。QLDBストリームによって生成される各 データレコードは、少なくとも1回はKinesis Data Streamsに配信されます。同じレコードがKinesisデータストリームに複数回現れることがあります。したがって、ユースケースで必要な場合は、コンシューマアプリケーションレイヤーに重複排除ロジックが必要です。またストリームデータの順序は保証されていないので、順序が求められる場合はロジックが必要です。
このあたりの仕組みは公式のサンプルアプリケーション「amazon-qldb-streaming-amazon-elasticsearch-sample-python」を参照ください。
QLDB は安い!?
AMB の場合、メンバーシップ料金およびピアノードの起動は時間課金ですので、データの流量が少なくともそれなりにランニングコストが掛かって来ますが、QLDB には時間課金はありません。
ストレージ料金およびデータ I/O 料金、データ転送料金のみです。
データベースストレージおよび IO 料金
(執筆時点の東京リージョン料金)
項目 | 料金 |
---|---|
書き込み I/O | 100 万件のリクエストあたり 0.799USD |
読み込み I/O | 100 万件のリクエストあたり 0.155USD |
ジャーナルストレージ料金 | 0.034USD/GB-月 |
インデックス化ストレージ料金 | 0.285USD/GB-月 |
データ転送料金
QLDB と同じ AWS リージョンの他の AWS のサービスの間で転送されたデータは無料です。
Data Transfer OUT From Amazon QLDB To Internet | 料金 |
---|---|
1 GB /月まで | 0.00USD/GB |
その後 9.999 TB /月まで | 0.114USD/GB |
その後 40 TB /月まで | 0.089USD/GB |
その後 100 TB /月まで | 0.086USD/GB |
150 TB/月を上回る場合 | 0.084USD/GB |
この料金で追跡可能なデータベースが手に入り、且つ、そのデータの不改ざんを容易に検証できるのであればお安い気もします。ただ、従来のデータベースのように削除という概念がなく、追記型ですのでジャーナルストレージは年々増加していくことになるかと思いますので、長期的なランニングコストがどれくらいになるかは、早い段階でログの流量などから試算されるのが良いでしょう。
やってみる
前置きが長くなりましたが、それでは QLDB ストリームを試していきましょう。今回は QLDB ストリームを作成して、Lambda でイベントを確認するところまでやってみたいと思います。
Kinesis Data Streams の作成
QLDB ストリームが指定できるダウンストリームは Kinesis Data Streams です。まずは Kinesis 側の作成をすませておきます。
IAM ロールの作成
QLDB が Kinesis Data Streams に PutRecord するためのポリシーを作成します。"Resource": "*"
としていますが必要に応じて対象となる Kinesis Data Streams は絞り込んでください。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "QLDBStreamKinesisPermissions", "Action": [ "kinesis:PutRecord*", "kinesis:DescribeStream", "kinesis:ListShards" ], "Effect": "Allow", "Resource": "*" } ] }
IAM ロールには以下の信頼関係ポリシーを設定します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "qldb.amazonaws.com" }, "Action": [ "sts:AssumeRole" ] } ] }
QLDB ストリーミングの設定
QLDB は公式ガイドのサンプルを使ってテーブル、およびインデックス作成しました。
QLDB コンソールから [ストリーム] を開き、[QLDB ストリームを作成] をクリックします。
任意のストリーム名を入力。ソースデータには事前に作成しておいた台帳を指定します。次に日時指定があります。今回は開始時点は自動入力のまま使用しますが、ここが QLDB ストリーミングの素晴らしいポイントの 1 つです。QLDB ストリーミングでは、イベントデータの履歴を再生することで過去の任意の時点の QLDB ストリームを開始することも出来ます。
例えば、あとから新しい指標で分析したくなったときに、QLDB を経由させておけば過去のストリームデータを取り出して分析することも出来るということですね。素晴らしい。
送信先ストリームは事前に作成しておいた Kinesis Data Streams を指定します。[Kinesis Data Streams でレコード集計を有効にする] にチェックを入れると、QLDB が複数のストリームレコードを単一の Kinesis Data Streams レコードに発行できるようになります。一度の API コールで送信されるレコード数が増えるので、プロデューサーのスループット効率をあげたい場合に利用できます。今回は簡単にイベントを確認したいので、データ集約は利用しないようにチェックを外しています。
IAM ロールについても事前に作成しておいたものを指定し、[QLDB ストリームを作成] をクリックします。
はい、これで QLDB ストリームの設定は完了です。
確認
Lambda 関数
それでは Lambda を使って、どのように QLDB ストリームイベントが流れてくるか確認します。今回 boto3(Python3.7) で以下のように Kinesis Data Streams のイベントを取得して、print するだけの関数を用意しました。
import base64 import boto3 import amazon.ion.simpleion as ion def lambda_handler(event, context): for record in event['Records']: # Kinesis data in Python Lambdas is base64 encoded payload = base64.b64decode(record['kinesis']['data']) # payload is the actual ion binary record published by QLDB to the stream ion_record = ion.loads(payload) print("Ion reocord: ", (ion.dumps(ion_record, binary=False)))
QLDB は JSON ではなく構造化データと非構造化データを保存できる Ion 形式を採用していますので、amazon.ion.simpleion
を import しています。今回は以下のように zip 化したものを Lambda Layer に登録して利用しています。
$ mkdir python $ pip install -t ./python amazon.ion $ zip -r amazon_ion.zip python
作成した Lambda のトリガーに先ほど作成した Kinesis Data Streams を設定します。Lambda の IAM ロールには Kinesis Data Streams から GetRecord できるようにポリシーをアタッチしてください。このあたりの設定は今回の本筋ではないため割愛しますので、詳細は公式ガイドを参照ください。
データの挿入
それでは QLDB クエリエディタを使って、レコードを挿入してみます。
INSERT INTO VehicleRegistration VALUE { 'VIN' : '1HVBBAANXWH544237', 'LicensePlateNumber' : 'LS477D', 'State' : 'WA', 'City' : 'Tacoma', 'PendingPenaltyTicketAmount' : 42.20, 'ValidFromDate' : `2011-10-26T`, 'ValidToDate' : `2023-09-25T`, 'Owners' : { 'PrimaryOwner' : { 'PersonId': '' }, 'SecondaryOwners' : [] } }
Lambda の実行ログを確認してみると、上記トランザクションに対して 5 つのレコードが出力されているようです。
レコードタイプ BLOCK_SUMMARY
では 25〜52 行目をみると、クエリ内容がジャーナルに含まれていることが判ります。
Ion reocord: $ion_1_0 { qldbStreamArn:"arn:aws:qldb:ap-northeast-1:xxxxxxxxxxxx:stream/test-qldb/I1pGyZ5iQbGBIrHwO56Wp5", recordType:"BLOCK_SUMMARY", payload:{ blockAddress:{ strandId:"LI6CnupHiDfLrVaMmERHBl", sequenceNo:96 }, transactionId:"9s8wPA8PmNM4HhCEsK0iTL", blockTimestamp:2020-05-24T10:24:16.906Z,blockHash:{{ Y/QjvPI84Czcp9dQJkDSg0Yp2sQDM71ew3fnT4mOjy4= }}, entriesHash:{{ pz8OfNIdynuJls20loKtiFR2/mtB5ZzDewCNrOnBeG0= }}, previousBlockHash:{{ 5VckqdXcr7fb3bp0C+xVn9dHetDlFtHmKcHxwLmd78k= }}, entriesHashList:[ {{HBVFR9YjUiSYmvbFjeWT/5meNh3bWAfRyE2hKckkzeA=}}, {{fnCmHOvOpa4DNFmw15sFfpM14nX8AsBJFql714zCzjE=}}, {{1wmnlLbLwo+n6oiVdZVCSW62EwWd/3NDY1orqntCeVQ=}}, {{05bSBecwOK5fj9GtL9U+Nr4ybiufMRDPogSyXwsnD68=}} ], transactionInfo:{ statements:[{ statement:"INSERT INTO VehicleRegistration VALUE\n{\n 'VIN' : '1HVBBAANXWH544237',\n 'LicensePlateNumber' : 'LS477D',\n 'State' : 'WA',\n 'City' : 'Tacoma',\n 'PendingPenaltyTicketAmount' : 42.20,\n 'ValidFromDate' : `2011-10-26T`,\n 'ValidToDate' : `2023-09-25T`,\n 'Owners' : {\n 'PrimaryOwner' : { 'PersonId': '' },\n 'SecondaryOwners' : []\n }\n }", startTime:2020-05-24T10:24:16.846Z, statementDigest:{{ EjOTZcSk4v/c1foe4wOag0b3ItwMIt57O6l3kxpN71w= }} }], documents:{ Dp04PKzLnsm9k3cVOyzN0O:{ tableName:"VehicleRegistration", tableId:"2kKu5y98x4Z2QYFiGrhvks", statements:[0] } } }, revisionSummaries:[{ hash:{{fnCmHOvOpa4DNFmw15sFfpM14nX8AsBJFql714zCzjE=}}, documentId:"Dp04PKzLnsm9k3cVOyzN0O" }] } }
次に、レコードタイプ REVISION_DETAILS
ではデータの値が含まれています。このレコードタイプの data
を使って DynamoDB を更新するような処理を仕込んでやれば、最新状態のデータは QLDB 以外のデータストアで参照するようなアーキテクチャを実装できそうです。 (ただし先述のとおり、イベントが重複配信されることを想定し、冪等性のロジックは必要です)
Ion reocord: $ion_1_0 { qldbStreamArn:"arn:aws:qldb:ap-northeast-1:xxxxxxxxxxxx:stream/test-qldb/I1pGyZ5iQbGBIrHwO56Wp5", recordType:"REVISION_DETAILS", payload:{ tableInfo:{ tableName:"VehicleRegistration", tableId:"2kKu5y98x4Z2QYFiGrhvks" }, revision:{ blockAddress:{ strandId:"LI6CnupHiDfLrVaMmERHBl", sequenceNo:96 }, hash:{{ fnCmHOvOpa4DNFmw15sFfpM14nX8AsBJFql714zCzjE= }}, data:{ VIN:"1HVBBAANXWH544237", LicensePlateNumber:"LS477D", State:"WA", City:"Tacoma", PendingPenaltyTicketAmount:42.20, ValidFromDate:2011-10-26T, ValidToDate:2023-09-25T, Owners:{ PrimaryOwner:{ PersonId:"" }, SecondaryOwners:[] } }, metadata:{ id:"Dp04PKzLnsm9k3cVOyzN0O", version:0, txTime:2020-05-24T10:24:16.885Z, txId:"9s8wPA8PmNM4HhCEsK0iTL" } } } }
あとの 3 つは SELECT * FROM information_schema.user_tables
が実行されていました。QLDB の仕組みを深く理解していないので、どういった目的でこのクエリが実行されるのかわかりませんが、データの挿入、更新だけでなく参照(SELECT
)のストリーミングも流れてくることは判りました。
Ion reocord: $ion_1_0 { qldbStreamArn:"arn:aws:qldb:ap-northeast-1:xxxxxxxxxxxx:stream/test-qldb/I1pGyZ5iQbGBIrHwO56Wp5", recordType:"BLOCK_SUMMARY", payload:{ blockAddress:{ strandId:"LI6CnupHiDfLrVaMmERHBl", sequenceNo:97 }, transactionId:"9MCnH3KNdq0Cq4mspOfXnO", blockTimestamp:2020-05-24T10:24:17.354000001Z, blockHash:{{VQoezchrqTxw2BlOC1ocCmjtmyRGEDFy0exUD7eY4kY=}}, entriesHash:{{Ey6NxIfGsXo7szaVzRiGCgZzAHperJE1DZCN8i2cxL8=}}, previousBlockHash:{{Y/QjvPI84Czcp9dQJkDSg0Yp2sQDM71ew3fnT4mOjy4=}}, entriesHashList:[ {{br/M8HeOHKbUap4n1e07zNgjJOppRLYoW5ygzZ4S2jI=}}, {{}}, {{ZMI1FT0gtWR/NYMjeQpWPaAgf4ABgOc0kO+4P5Rer2w=}} ], transactionInfo:{ statements:[{ statement:"SELECT * FROM information_schema.user_tables", startTime:2020-05-24T10:24:17.314Z, statementDigest:{{vrvWiTj+6MEi8QmaEflsEgf4p3SBk7HREoAXijhK9vc=}} }] } } }
ちなみに SELECT * FROM information_schema.user_tables
は以下のようなデータの取得になります。
データの更新
ここからは QLDB ストリームというよりも、せっかくなので QLDB がトレーサビリティおよび不改ざん性を紹介したいと思います。
次に先ほど登録した車に対してオーナーを登録します。新たなオーナーとなる Raul.Lewis
の個人 ID を参照します。
SELECT metadata.id FROM _ql_committed_Person AS p WHERE p.data.FirstName = 'Raul' and p.data.LastName = 'Lewis'
id |
---|
"CcTj6PVx5Hg1Zr1BbcKS6q" |
個人 ID が判りましたので、車体番号 1N4AL11D75C109151
のオーナーとして登録します。
UPDATE VehicleRegistration AS r SET r.Owners.PrimaryOwner.PersonId = 'CcTj6PVx5Hg1Zr1BbcKS6q' WHERE r.VIN = '1N4AL11D75C109151'
documentId |
---|
"0FyBxmLQzxH0J3hHbrVIEn" |
次に同車のオーナーを Brent.Logan
に委譲するため、個人 ID を参照します。
SELECT metadata.id FROM _ql_committed_Person AS p WHERE p.data.FirstName = 'Brent' and p.data.LastName = 'Logan'
id |
---|
"47VIRbGPULk0STeGzT4fES" |
Brent.Logan
の個人 ID が判りましたので、オーナーを委譲します。あわせて市も Everett
へと変更します。
UPDATE VehicleRegistration AS r SET r.Owners.PrimaryOwner.PersonId = '47VIRbGPULk0STeGzT4fES', r.City = 'Everett' WHERE r.VIN = '1N4AL11D75C109151'
documentId |
---|
"0FyBxmLQzxH0J3hHbrVIEn" |
最新の状態を確認すると、オーナーと所在地が変更されていることが判ります。
SELECT VIN,Owners,City FROM VehicleRegistration AS r WHERE r.VIN = '1N4AL11D75C109151'
VIN | Owners | City |
---|---|---|
"1N4AL11D75C109151" | {PrimaryOwner:{PersonId:"47VIRbGPULk0STeGzT4fES"},SecondaryOwners:[]} | "Everett" |
データを検証する
先ほどのトランザクションに対するストリームイベントは下記のとおりです。
Ion reocord: $ion_1_0 { qldbStreamArn:"arn:aws:qldb:ap-northeast-1:xxxxxxxxxxxx:stream/test-qldb/I1pGyZ5iQbGBIrHwO56Wp5", recordType:"BLOCK_SUMMARY", payload:{ blockAddress:{ strandId:"LI6CnupHiDfLrVaMmERHBl", sequenceNo:134 }, transactionId:"0bGI3BoBWiXEnhDRhK6q18", blockTimestamp:2020-05-24T13:30:42.461000001Z, blockHash:{{NwlHJY5tp6xHwoBkVnsQFm3fZx8br7E0LzkoTe7wNkk=}}, entriesHash:{{lc0hFDdLvPVqsO68EI5JpT/wPwTr0WARvf81rypmHHs=}}, previousBlockHash:{{P+DpDHA1go4U6RauMn/gKDukDLd4RbcrKyTUUygZDxQ=}}, entriesHashList:[ {{86y3GGWA21a/m7Ri7vaSFoainVVM+HkXGJ/xn3UOoxc=}}, {{WnCWqGmG4qI0aomhLRheVb3SyvMXjK58aIeMd9L/0dM=}}, {{NCUauWn8mWiKMrZHrE6LSd8irM5qI4HoWy377681t9I=}}, {{wZ//kQG/0LwxGQ5NXF3d4ota+fIBrqXMb7W8y6//WH0=}} ], transactionInfo:{ statements:[{ statement:"UPDATE VehicleRegistration AS r\n SET r.Owners.PrimaryOwner.PersonId = '47VIRbGPULk0STeGzT4fES',\n r.City = 'Everett'\n WHERE r.VIN = '1N4AL11D75C109151'", startTime:2020-05-24T13:30:42.386Z, statementDigest:{{tuCXaIQ9Ahf8tW8i0jevN3qWZ4MNMsVxVFz67xueovY=}} }], documents:{ '0FyBxmLQzxH0J3hHbrVIEn':{ tableName:"VehicleRegistration", tableId:"2kKu5y98x4Z2QYFiGrhvks", statements:[0,0] } } }, revisionSummaries:[{ hash:{{WnCWqGmG4qI0aomhLRheVb3SyvMXjK58aIeMd9L/0dM=}}, documentId:"0FyBxmLQzxH0J3hHbrVIEn" }] } }
blockAddress
と documentId
を指定して、トランザクション時点のドキュメントを検証してみましょう。
QLDB 管理コンソールから [台帳] を開き、対象の台帳を選択し [ダイジェストを取得] をクリックします。
ダイジェスト情報が表示されますので [保存] します。
次に [検証] を開き、検証する台帳を指定。検証したいブロックアドレスおよびドキュメント ID を指定し、先ほど取得したダイジェストを選択すると、ダイジェストおよびダイジェストヒントアドレスが入力されますので、[検証] をクリックします。
ハッシュ計算によって算出されたダイジェストと、検証値として渡したダイジェストが合致することが確認できました。つまり、このデータが改ざんされていないことの証明ができたということになります。加えて、仮にデータが改ざんしたとしても容易に検証されてしまうことが判ります。
また、[ブロック] タブを開くと、当該ブロックアドレスに含まれるトランザクションを容易に確認することができます。これが、QLDB がトレーサビリティに優れているといわれるところですね。
検証は以上です!
さいごに
これまで QLDB のトレーサビリティや不改ざん性に魅力を感じつつも、従来と異なるデータの取り扱いに躊躇されている方も少なくないかと思いますが、QLDB ストリームによってジャーナルは QLDB で管理しつつ、最新データの参照などは従来のデータストアを利用する、といったアーキテクチャが可能となりました。
QLDB はデータ量、IO回数の従量課金ですので、PoC などの初期コストはお安くはじめることが出来ます。
ハッシュチェーンを利用した従来とは異なるデータの持ち方を活用して、RDBMS や KVS では実現できなったサービスを考えてみては如何でしょうか!?
以上!大阪オフィスの丸毛(@marumo1981)でした!